package org.example.apitest.tableapi;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionParser;
import org.apache.flink.types.Row;
import org.example.apitest.beans.SensorReading;

public class Example {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1. 从文件中读取数据
        DataStream<String> inputStream = env.readTextFile("src/main/resources/sensor.txt");

        // 2. 转换为SensorReading类型
        DataStream<SensorReading> dataStream = inputStream
                .map((MapFunction<String, SensorReading>) SensorReading::new);

        // 3. 创建表的执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 4. 基于流创建一张表
        Table dataTable = tableEnv.fromDataStream(dataStream);

        // 5. 调用table API 进行转换操作
        Table resultTable = dataTable.select("id,temperature")
                .where("id='sensor_1'");

        // 6. 执行SQL
        tableEnv.createTemporaryView("sensor", dataTable);
        String sql = "SELECT id,temperature FROM sensor where id='sensor_1'";
        Table resultSQLTable = tableEnv.sqlQuery(sql);

        // 7. 打印输出
        tableEnv.toAppendStream(resultTable, Row.class).print("result");
        tableEnv.toAppendStream(resultSQLTable, Row.class).print("resultSQL");

        env.execute();
    }
}
